package com.kaigejava.actor.deamoB.impl;

import com.kaigejava.actor.deamoB.Actor;
import com.kaigejava.actor.deamoB.dto.Message;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @BelongsProject: kaigejavastudy
 * @BelongsPackage: com.kaigejava.actor.deamoB.impl
 * @Author: kaigejava
 * @CreateTime: 2023-08-16  14:12
 * @Description: TODO
 * @Version: 1.0
 */
public class NodeActor implements Actor {
    private final BlockingQueue<Message> queue;
    private final Actor nextNodeIfGreater;
    private final Actor nextNodeIfLessOrEqual;

    public NodeActor(Actor nextNodeIfGreater, Actor nextNodeIfLessOrEqual) {
        this.queue = new LinkedBlockingQueue<>();
        this.nextNodeIfGreater = nextNodeIfGreater;
        this.nextNodeIfLessOrEqual = nextNodeIfLessOrEqual;
    }

    @Override
    public void receive(Message message) {
        try {
            queue.put(message);
            processMessages();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void processMessages() throws InterruptedException {
        while (!queue.isEmpty()) {
            Message message = queue.take();
            int result = calculateResult(message.getPayload());
            Actor nextNode = result > 10 ? nextNodeIfGreater : nextNodeIfLessOrEqual;
            if (nextNode != null) {
                nextNode.receive(new Message(this, result));
            }
        }
    }

    private int calculateResult(Object payload) {
        // 假设节点的计算逻辑在这里实现
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return (int) payload;
    }
}
